Cleanup MTP async data consumer#7423
Conversation
There was a problem hiding this comment.
Pull request overview
This pull request refactors the asynchronous data consumer logic in the Microsoft Testing Platform to eliminate arbitrary Task.Delay calls during the drain operation. The changes simplify DrainDataAsync by replacing the retry-with-backoff mechanism with a more deterministic approach: completing the current channel, waiting for all messages to be consumed, then creating a fresh channel for any subsequent messages.
Changes:
- Removed loop detection logic from
DrainDataAsyncthat previously detected and threw exceptions for publisher/consumer loops - Changed
DrainDataAsyncfrom tracking message counts with delays to a simpler complete-and-restart approach - Removed
IEnvironmentdependency andTESTINGPLATFORM_MESSAGEBUS_DRAINDATA_ATTEMPTSenvironment variable - Changed channel and consume task fields from readonly to mutable to support channel recreation
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| src/Platform/Microsoft.Testing.Platform/Messages/IAsyncConsumerDataProcessor.cs | Changed DrainDataAsync return type from Task<long> to Task, removing message count tracking |
| src/Platform/Microsoft.Testing.Platform/Messages/AsynchronousMessageBus.cs | Removed IEnvironment parameter, loop detection logic, and simplified DrainDataAsync to iterate through processors |
| src/Platform/Microsoft.Testing.Platform/Messages/AsyncConsumerDataProcessor.net.cs | Refactored DrainDataAsync to complete channel and restart with fresh channel; removed message counting and error tracking logic; made channel and task fields mutable |
| src/Platform/Microsoft.Testing.Platform/Messages/AsyncConsumerDataProcessor.netstandard.cs | Refactored DrainDataAsync to complete channel and restart with fresh channel; removed message counting and error tracking logic; made channel and task fields mutable |
| src/Platform/Microsoft.Testing.Platform/Hosts/TestHostControllersTestHost.cs | Removed IEnvironment parameter from AsynchronousMessageBus constructor call |
| src/Platform/Microsoft.Testing.Platform/Hosts/TestHostBuilder.cs | Removed IEnvironment parameter from AsynchronousMessageBus constructor call |
| src/Platform/Microsoft.Testing.Platform/Helpers/EnvironmentVariableConstants.cs | Removed TESTINGPLATFORM_MESSAGEBUS_DRAINDATA_ATTEMPTS constant as it's no longer used |
| test/UnitTests/Microsoft.Testing.Platform.UnitTests/Messages/AsynchronousMessageBusTests.cs | Updated test constructor calls to remove IEnvironment parameter and use var for type inference |
| public async Task DrainDataAsync_Loop_ShouldFail() | ||
| { | ||
| using MessageBusProxy proxy = new(); | ||
| LoopConsumerA consumerA = new(proxy); | ||
| ConsumerB consumerB = new(proxy); | ||
| AsynchronousMessageBus asynchronousMessageBus = new( | ||
| var asynchronousMessageBus = new AsynchronousMessageBus( | ||
| [consumerA, consumerB], | ||
| new CTRLPlusCCancellationTokenSource(), | ||
| new SystemTask(), | ||
| new NopLoggerFactory(), | ||
| new SystemEnvironment()); | ||
| new NopLoggerFactory()); | ||
| await asynchronousMessageBus.InitAsync(); | ||
| proxy.SetBuiltMessageBus(asynchronousMessageBus); | ||
|
|
||
| await proxy.PublishAsync(consumerA, new LoopDataA()); | ||
|
|
||
| InvalidOperationException ex = await Assert.ThrowsExactlyAsync<InvalidOperationException>(asynchronousMessageBus.DrainDataAsync); | ||
| Assert.Contains("Publisher/Consumer loop detected during the drain after", ex.Message); | ||
|
|
||
| // Prevent loop to continue | ||
| consumerA.StopConsume(); | ||
| consumerB.StopConsume(); | ||
| } |
There was a problem hiding this comment.
This test expects DrainDataAsync to throw an InvalidOperationException when a publisher/consumer loop is detected. However, the refactored DrainDataAsync implementation no longer includes loop detection logic. The new implementation simply completes the channel, waits for the consume task, and restarts with a fresh channel. This test will now fail or hang indefinitely as the loop will continue after each drain cycle.
| public async Task DrainDataAsync() | ||
| { | ||
| // We go volatile because we race with Interlocked.Increment in PublishAsync | ||
| long totalPayloadProcessed = Volatile.Read(ref _totalPayloadProcessed); | ||
| long totalPayloadReceived = Volatile.Read(ref _totalPayloadReceived); | ||
| const int minDelayTimeMs = 25; | ||
| int currentDelayTimeMs = minDelayTimeMs; | ||
| while (Interlocked.CompareExchange(ref _totalPayloadReceived, totalPayloadReceived, totalPayloadProcessed) != totalPayloadProcessed) | ||
| { | ||
| // When we cancel we throw inside ConsumeAsync and we won't drain anymore any data | ||
| if (_cancellationToken.IsCancellationRequested) | ||
| { | ||
| break; | ||
| } | ||
|
|
||
| await _task.Delay(currentDelayTimeMs).ConfigureAwait(false); | ||
| currentDelayTimeMs = Math.Min(currentDelayTimeMs + minDelayTimeMs, 200); | ||
|
|
||
| if (_consumerState.Task.IsFaulted) | ||
| { | ||
| // Rethrow the exception | ||
| await _consumerState.Task.ConfigureAwait(false); | ||
| } | ||
|
|
||
| // Wait for the consumer to complete the current enqueued items | ||
| totalPayloadProcessed = Volatile.Read(ref _totalPayloadProcessed); | ||
| totalPayloadReceived = Volatile.Read(ref _totalPayloadReceived); | ||
| } | ||
|
|
||
| // It' possible that we fail and we have consumed the item | ||
| if (_consumerState.Task.IsFaulted) | ||
| { | ||
| // Rethrow the exception | ||
| await _consumerState.Task.ConfigureAwait(false); | ||
| } | ||
| _channel.Complete(); | ||
| await _consumeTask.ConfigureAwait(false); | ||
|
|
||
| return _totalPayloadReceived; | ||
| _channel = new(); | ||
| _consumeTask = _task.Run(ConsumeAsync, _cancellationToken); | ||
| } |
There was a problem hiding this comment.
Critical race condition: When DrainDataAsync completes the channel (line 74) and before creating a new channel (line 77), any concurrent calls to PublishAsync will throw InvalidOperationException when trying to write to the completed channel. This is problematic because DrainDataAsync is called at multiple synchronization points during normal execution (see CommonTestHost.cs lines 223, 229, 245, 249), not just during shutdown. The old implementation avoided this by not completing the channel during drain. Consider using a lock or other synchronization mechanism to atomically swap the old channel with a new one, or ensure no publishing can occur during drain.
| public async Task DrainDataAsync() | ||
| { | ||
| // We go volatile because we race with Interlocked.Increment in PublishAsync | ||
| long totalPayloadProcessed = Volatile.Read(ref _totalPayloadProcessed); | ||
| long totalPayloadReceived = Volatile.Read(ref _totalPayloadReceived); | ||
| const int minDelayTimeMs = 25; | ||
| int currentDelayTimeMs = minDelayTimeMs; | ||
| while (Interlocked.CompareExchange(ref _totalPayloadReceived, totalPayloadReceived, totalPayloadProcessed) != totalPayloadProcessed) | ||
| { | ||
| // When we cancel we throw inside ConsumeAsync and we won't drain anymore any data | ||
| if (_cancellationToken.IsCancellationRequested) | ||
| { | ||
| break; | ||
| } | ||
|
|
||
| await _task.Delay(currentDelayTimeMs).ConfigureAwait(false); | ||
| currentDelayTimeMs = Math.Min(currentDelayTimeMs + minDelayTimeMs, 200); | ||
|
|
||
| if (_consumerState.Task.IsFaulted) | ||
| { | ||
| // Rethrow the exception | ||
| await _consumerState.Task.ConfigureAwait(false); | ||
| } | ||
|
|
||
| // Wait for the consumer to complete the current enqueued items | ||
| totalPayloadProcessed = Volatile.Read(ref _totalPayloadProcessed); | ||
| totalPayloadReceived = Volatile.Read(ref _totalPayloadReceived); | ||
| } | ||
|
|
||
| // It' possible that we fail and we have consumed the item | ||
| if (_consumerState.Task.IsFaulted) | ||
| { | ||
| // Rethrow the exception | ||
| await _consumerState.Task.ConfigureAwait(false); | ||
| } | ||
| _channel.Writer.Complete(); | ||
| await _consumeTask.ConfigureAwait(false); | ||
|
|
||
| return _totalPayloadReceived; | ||
| _channel = CreateChannel(); | ||
| _consumeTask = _task.Run(ConsumeAsync, _cancellationToken); | ||
| } |
There was a problem hiding this comment.
Critical race condition: When DrainDataAsync completes the channel writer (line 72) and before creating a new channel (line 75), any concurrent calls to PublishAsync will throw ChannelClosedException when trying to write to the completed channel. This is problematic because DrainDataAsync is called at multiple synchronization points during normal execution (see CommonTestHost.cs lines 223, 229, 245, 249), not just during shutdown. The old implementation avoided this by not completing the channel during drain. Consider using a lock or other synchronization mechanism to atomically swap the old channel with a new one, or ensure no publishing can occur during drain.
Resolve merge conflicts caused by the TestHostBuilder split (PR #8201) by applying the IEnvironment removal in TestHostBuilder.Framework.cs where the AsynchronousMessageBus is now constructed. Rework AsyncConsumerDataProcessor.DrainDataAsync to use a sentinel drain marker queued through the existing channel instead of completing and recreating the channel. This addresses both Copilot review comments: - No more channel-close race with concurrent PublishAsync calls because the channel is never completed during a drain. - Publisher/consumer loops are still detected: AsynchronousMessageBus loops through distinct processors until none of them processed any payload (up to MaxDrainAttempts = 5) and throws InvalidOperationException with the legacy 'Publisher/Consumer loop detected during the drain after ...' message otherwise. IAsyncConsumerDataProcessor.DrainDataAsync now returns Task<bool> so the bus can detect when a round produced data and another round is needed. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Pushed an updated commit that merges
Side effects:
All 2223 |
OneLocBuild commit 60e6270 removed the AreEquivalent translations from all FrameworkMessages.*.xlf files even though the corresponding entries were left in FrameworkMessages.resx (PR #8266). This puts every PR's CI build into an out-of-sync state. Regenerating with 'msbuild /t:UpdateXlf' restores the entries. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
| } | ||
| finally | ||
|
|
||
| Interlocked.Increment(ref _processedCount); |
| } | ||
| finally | ||
|
|
||
| Interlocked.Increment(ref _processedCount); |
| totalPayloadReceived = Volatile.Read(ref _totalPayloadReceived); | ||
| try | ||
| { | ||
| await _channel.Writer.WriteAsync(AsyncConsumerDataProcessorMessage.CreateDrainMarker(drainMarker), _cancellationToken).ConfigureAwait(false); |
| // Maximum number of drain rounds before we consider that a publisher/consumer cycle exists | ||
| // and we throw to surface the bug rather than spin forever. | ||
| private const int MaxDrainAttempts = 5; |
…rride, handle OCE during drain * AsyncConsumerDataProcessor now exposes ReceivedCount (incremented in PublishAsync). The bus uses it to detect publisher/consumer cycles across drain rounds instead of the per-processor _processedCount snapshot. The previous design under-counted in-flight consumers and could let the bus stop draining with newly produced data still pending, as flagged by Copilot review (AsyncConsumerDataProcessor.net.cs:63 and .netstandard.cs:53). * AsynchronousMessageBus restores the TESTINGPLATFORM_MESSAGEBUS_DRAINDATA_ATTEMPTS environment variable so that test hosts can raise the limit for longer but finite producer/consumer chains, matching the previous escape hatch. The constructor receives IEnvironment again; TestHostBuilder.Framework.cs and TestHostControllersTestHost.cs pass it through. * AsyncConsumerDataProcessor.DrainDataAsync now catches OperationCanceledException on the drain-marker write and returns gracefully, matching the previous behavior of bailing out of drain on cancellation (Copilot review on AsyncConsumerDataProcessor.net.cs:99). * IAsyncConsumerDataProcessor.DrainDataAsync goes back to returning Task (no longer Task<bool>) since loop detection is now driven by ReceivedCount. * Regenerate TrxReport ExtensionResources.*.xlf entries that were dropped by the OneLocBuild check-in on main, otherwise the build fails before our changes are even compiled. * Test constructor calls pass new SystemEnvironment() to the message bus. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Pushed
Other notes:
Local validation:
|
|
CI is now in good shape after the latest push:
The PR is |
This refactors the logic to avoid the arbitrary Task.Delay calls in DrainDataAsync, and cleans up the logic.